package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Random

object Spark_Stream_DIY {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")

    //创建自定义 Receiver 的Streaming
    val messageDS:ReceiverInputDStream[String] =  ssc.receiverStream(new MyReceiver())

    messageDS.print()

    ssc.start()
    ssc.awaitTermination();
  }

  /*
  自定义数据采集器
  1.继承Receiver ,定义泛型，传递参数
  2.重写方法
   */
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
    private var flag = true
    //刚启动的时候，调用该方法，作用就是用来读取并发送数据给Spark
    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          //利用线程的方式 异步的生产数据
          while (flag){
            val message = "采集的数据为：" + new Random().nextInt(10).toString
            store(message)//返回出去的数据
            Thread.sleep(2000)
          }
        }
      }).start()
    }

    override def onStop(): Unit = {
      flag = false
    }
  }

}
